# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This will parse a textual representation of a duration. The formats
# accepted are based on the ISO-8601 duration format {@code PnDTnHnMn.nS}
# with days considered to be exactly 24 hours.
# <p>
# Examples:
# <pre>
#    "PT20.345S" -- parses as "20.345 seconds"
#    "PT15M"     -- parses as "15 minutes" (where a minute is 60 seconds)
#    "PT10H"     -- parses as "10 hours" (where an hour is 3600 seconds)
#    "P2D"       -- parses as "2 days" (where a day is 24 hours or 86400 seconds)
#    "P2DT3H4M"  -- parses as "2 days, 3 hours and 4 minutes"
#    "P-6H3M"    -- parses as "-6 hours and +3 minutes"
#    "-P6H3M"    -- parses as "-6 hours and -3 minutes"
#    "-P-6H+3M"  -- parses as "+6 hours and -3 minutes"
# </pre>
filter: "{ tags -> tags.job_name == 'kafka-monitoring' }" # The OpenTelemetry job name
expSuffix: tag({tags -> tags.cluster = 'kafka::' + tags.cluster}).instance(['cluster'], ['broker'], Layer.KAFKA)
metricPrefix: meter_kafka_broker
metricsRules:

  - name: cpu_time_total
    exp: (process_cpu_seconds_total * 100).sum(['cluster', 'broker']).rate('PT1M')

  - name: memory_usage_percentage
    exp: (jvm_memory_bytes_used * 100).tagMatch("area", "heap").sum(['cluster', 'broker', 'area']) / (jvm_memory_bytes_max).tagMatch("area", "heap").sum(['cluster', 'broker', 'area'])

  - name: messages_per_second
    exp: kafka_server_brokertopicmetrics_messagesin_total.sum(['cluster', 'broker']).rate('PT1M')

  - name: bytes_in_per_second
    exp: kafka_server_brokertopicmetrics_bytesin_total.sum(['cluster', 'broker']).rate('PT1M')

  - name: bytes_out_per_second
    exp: kafka_server_brokertopicmetrics_bytesout_total.sum(['cluster', 'broker']).rate('PT1M')

  - name: replication_bytes_in_per_second
    exp: kafka_server_brokertopicmetrics_replicationbytesin_total.sum(['cluster', 'broker']).rate('PT1M')

  - name: replication_bytes_out_per_second
    exp: kafka_server_brokertopicmetrics_replicationbytesout_total.sum(['cluster', 'broker']).rate('PT1M')

  - name: under_replicated_partitions
    exp: kafka_server_replicamanager_underreplicatedpartitions.sum(['cluster', 'broker'])

  - name: under_min_isr_partition_count
    exp: kafka_server_replicamanager_underminisrpartitioncount.sum(['cluster', 'broker'])

  - name: partition_count
    exp: kafka_server_replicamanager_partitioncount.sum(['cluster', 'broker'])

  - name: leader_count
    exp: kafka_server_replicamanager_leadercount.sum(['cluster', 'broker'])

  - name: isr_shrinks_per_second
    exp: kafka_server_replicamanager_isrshrinks_total.sum(['cluster', 'broker']).rate('PT1M')
    
  - name: isr_expands_per_second
    exp: kafka_server_replicamanager_isrexpands_total.sum(['cluster', 'broker']).rate('PT1M')

  - name: max_lag
    exp: kafka_server_replicafetchermanager_maxlag.sum(['cluster', 'broker'])

  - name: purgatory_size
    exp: kafka_server_delayedoperationpurgatory_purgatorysize.tagMatch("delayedOperation", "Produce|Fetch").sum(['cluster', 'broker','delayedOperation'])

  - name: garbage_collector_count
    exp: jvm_gc_collection_seconds_count.tagMatch("gc", "G1 Young Generation|G1 Old Generation").sum(['cluster', 'broker','gc']).rate('PT1M')

  - name: requests_per_second
    exp: kafka_network_requestmetrics_requests_total.tagMatch("request", "FetchConsumer|Produce|Fetch").sum(['cluster','broker','request']).rate('PT1M')

  - name: request_queue_time_ms
    exp: kafka_network_requestmetrics_requestqueuetimems_count.tagMatch("request", "Produce|FetchConsumer|FetchFollower").sum(['cluster','broker','request']).rate('PT1M')

  - name: remote_time_ms
    exp: kafka_network_requestmetrics_remotetimems_count.tagMatch("request", "Produce|FetchConsumer|FetchFollower").sum(['cluster','broker','request']).rate('PT1M')

  - name: response_queue_time_ms
    exp: kafka_network_requestmetrics_responsequeuetimems_count.tagMatch("request", "Produce|FetchConsumer|FetchFollower").sum(['cluster','broker','request']).rate('PT1M')

  - name: response_send_time_ms
    exp: kafka_network_requestmetrics_responsesendtimems_count.tagMatch("request", "Produce|FetchConsumer|FetchFollower").sum(['cluster','broker','request']).rate('PT1M')

  - name: network_processor_avg_idle_percent
    exp: (kafka_network_socketserver_networkprocessoravgidlepercent * 100).sum(['cluster','broker']).rate('PT1M')

  - name: topic_messages_in_total
    exp: kafka_server_brokertopicmetrics_messagesin_total.sum(['cluster','broker','topic'])

  - name: topic_bytesout_per_second
    exp: kafka_server_brokertopicmetrics_bytesout_total.sum(['cluster','broker','topic']).rate('PT1M')

  - name: topic_bytesin_per_second
    exp: kafka_server_brokertopicmetrics_bytesout_total.sum(['cluster','broker','topic']).rate('PT1M')

  - name: topic_fetch_requests_per_second
    exp: kafka_server_brokertopicmetrics_totalfetchrequests_total.sum(['cluster','broker','topic']).rate('PT1M')

  - name: topic_produce_requests_per_second
    exp: kafka_server_brokertopicmetrics_totalproducerequests_total.sum(['cluster','broker','topic']).rate('PT1M')



